Skip to content

Conversation

@DouweM
Copy link
Collaborator

@DouweM DouweM commented Sep 22, 2025

This PR introduces a new Beta Graph API for pydantic-graph that provides enhanced capabilities for parallel execution, conditional branching, and complex workflows. The original graph API remains fully supported and is compatible with the new beta API.

Key Features

Builder Pattern Interface

The new API uses a function-based builder pattern with the @g.step decorator instead of the class-based BaseNode approach. This makes it much easier to dynamically define connections between nodes:

from pydantic_graph.beta import GraphBuilder, StepContext
from pydantic_graph.beta.join import reduce_list_append

g = GraphBuilder(input_type=list[int], output_type=list[int])

@g.step
async def square(ctx: StepContext[None, None, int]) -> int:
    return ctx.inputs * ctx.inputs

collect = g.join(reduce_list_append, initial_factory=list[int])

g.add(
    g.edge_from(g.start_node).map().to(square),
    g.edge_from(square).to(collect),
    g.edge_from(collect).to(g.end_node),
)

graph = g.build()

async def main():
    result = await graph.run(inputs=[1, 2, 3, 4, 5])
    # Result: [1, 4, 9, 16, 25]

Steps accept a StepContext argument that is generic in input type, output type, and state type. A state instance is shared across the run similar to how graphs currently work, but steps now also have inputs that come from the immediately preceding step, and outputs that go to the immediately following step. This allows you to design your graph for better type-safety than you typically get when using a global state object that is built incrementally during the course of a graph run. (And you always have the option to leave the inputs/outputs as None and stick with just using/modifying the state during the run.)

More generally, we have thorough type-checking for all edges —

Parallel Execution Operations

  • Map operations (g.map()) - Fan out to process iterables in parallel
  • Broadcast operations (g.broadcast()) - Send the same data to multiple parallel paths
  • Join nodes and Reducers - Aggregate results from parallel execution with built-in reducers

Conditional Branching

  • Decision nodes - Dynamic routing based on runtime conditions
  • Type-safe branching via inputs/outputs, with exhaustiveness checking, also through the type system (!)

Advanced Execution Control

  • Step-by-step execution via graph.iter() for fine-grained control (just like in the previous, non-beta API), even during parallel execution.
  • Access to execution events and task inspection during runtime

Closes #704

@github-actions
Copy link

github-actions bot commented Sep 22, 2025

Docs Preview

commit: 383ed50
Preview URL: https://15ebf6dd-pydantic-ai-previews.pydantic.workers.dev

@dmontagu dmontagu force-pushed the dmontagu/new-graph-api branch from f410d5c to 17555b4 Compare October 23, 2025 22:48
@dmontagu dmontagu merged commit ee1b8d6 into main Oct 24, 2025
31 checks passed
@dmontagu dmontagu deleted the dmontagu/new-graph-api branch October 24, 2025 14:09
@kaben
Copy link

kaben commented Oct 25, 2025

Thank you @dmontagu for your work on this. I've tried the beta graph API and I really like it.

@sirianni
Copy link

sirianni commented Nov 6, 2025

@dmontagu @DouweM thanks for this work!! I don't see any persistence capabilities in the new Graph V2 code. Is this coming later, or is persistence not applicable with the new approach?

@DouweM
Copy link
Collaborator Author

DouweM commented Nov 7, 2025

@sirianni David put some notes on persistence with the new API in #530 (comment)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Parellel node execution in Graphs (AKA Graphs V2)

6 participants